Amazon RedshiftからAmazon Kinesisのストリームデータをニアリアルタイムにインジェストしてみた

Amazon RedshiftからAmazon Kinesisのストリームデータをニアリアルタイムにインジェストしてみた

Clock Icon2022.05.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon RedshiftからAmazon Kinesis Data Streamのストリームデータをマテリアライズド・ビュー経由でニアリアルタイムにインジェストする機能が2022年2月からパブリック・プレビュー提供されています。

Amazon Redshift announces public preview of Streaming Ingestion for Kinesis Data Streams

Amazon Kinesis Data StreamのストリームデータをRedshiftから利用したい場合、従来は Amazon Kinesis Data Firehoseを経由して一度 S3 に出力し、S3 データを Redshift に COPY する必要があり、ストイームデータのリアルタイム性が損なわれていました。

本機能を利用すると、マテリアライズド・ビューの更新レイテンシーが発生するものの、ニアリアルタイムにKinesis Data Streamsのデータをインジェストできます。

やってみた

Kinesis Data StreamsにPUTされたレコードをRedshiftからSQLで参照するところまでを動作確認します。

1. Kinesis Data Stream の作成

ストリームデータを送信する Kinesis Data Stream を作成します。

このストリームには、以下の様なフォーマットのレコードを送信します。

{
  "user": 2,
  "heartrate": 143,
  "power": 194,
  "cadence": 93,
  "timestamp": "2022-05-20 14:50:09"
}

2. Redshift に Kinesis Data Streams用ポリシーを付与

RedshiftがKinesis Data Streamsにアクセスできるよう、Redshiftクラスターに、以下のポリシーを付与したIAMロールを適用します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:0123456789:stream/*"
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

3. Kinesis用外部スキーマの作成

Amazon Redshift Spectrumと同様に、Redshift外のデータベースである Kinesis Data Streams を参照するために、外部スキーマを作成します。

CREATE EXTERNAL SCHEMA schema_one
FROM KINESIS
IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

IAM_ROLE には、手順2 で適用した IAM ロールの ARN を設定します。

4. マテリアライズド・ビューの作成

Kinesisの基本情報とレコード(Data)だけを取得するマテリアライズド・ビューを作成します。

CREATE MATERIALIZED VIEW view_foo AS
SELECT
  approximatearrivaltimestamp,
  partitionkey,
  shardid,
  sequencenumber,
  JSON_PARSE(from_varbyte(Data, 'utf-8')) as Data
FROM
  schema_one.YOUR_STREAM_NAME

YOUR_STREAM_NAME には 手順1で作成したストリーム名を設定します。

5. マテリアライズド・ビューの更新

KinesisからRedshiftのマテリアライズド・ビューにデータを引っ張ってくるために、ビューをリフレッシュします。

REFRESH MATERIALIZED VIEW view_foo

初回リフレッシュ時には、ストリームに存在するすべてのデータを同期します(ストリームのチェックポイントはTRIM_HORIZON)。以降は差分同期です。

6. マテリアライズド・ビューの参照

ストリームデータを覗いてみます。

select *
from view_foo
limit 5
approximatearrivaltimestamp partitionkey shardid sequencenumber data
2022-05-20 14:50:10 881de89a8c61475a82ecb424f373effc shardId-000000000000 49629688051866322531082401886402786968421632977418584066 {"user":2,"heartrate":143,"power":194,"cadence":93,"timestamp":"2022-05-20 14:50:09"}
2022-05-20 14:50:10 0c9a133651a846fe87bd1680d45b286c shardId-000000000000 49629688051866322531082401886403995894241247606593290242 {"user":1,"heartrate":146,"power":218,"cadence":97,"timestamp":"2022-05-20 14:50:10"}
2022-05-20 14:50:12 45ec6b0130c34bd4afc13a4e9b3c4972 shardId-000000000000 49629688051866322531082401886407622671700091562836885506 {"user":1,"heartrate":139,"power":209,"cadence":93,"timestamp":"2022-05-20 14:50:11"}
2022-05-20 14:50:15 5c7b5efa1d9940e0a72ab06e3d0f9b5b shardId-000000000000 49629688051866322531082401886414876226617779544043552770 {"user":2,"heartrate":137,"power":186,"cadence":90,"timestamp":"2022-05-20 14:50:14"}
2022-05-20 14:50:17 083b690087414ce19d48eb9aefa048fd shardId-000000000000 49629688051866322531082401886419711929896238266900807682 {"user":2,"heartrate":129,"power":175,"cadence":85,"timestamp":"2022-05-20 14:50:16"}%

Kinesis Data Streamsのデータを取得できています。

パーティションキー(partitionkey)やシャードID(shardid)などのKinesisストリームの付加情報はデバッグなどで活用しましょう。

7. 複雑なマテリアライズド・ビューの定義

Kinesis には JSON 形式のレコードが送信されています。 属性ごとに展開したマテリアライズド・ビューを定義します。

CREATE MATERIALIZED VIEW view_foo_extract DISTKEY(2) sortkey(1) AS
SELECT
  approximatearrivaltimestamp,
  json_extract_path_text(from_varbyte(data, 'utf-8'), 'user') :: INT as user_id,
  json_extract_path_text(from_varbyte(data, 'utf-8'), 'timestamp') :: varchar(20) as generated_time,
  json_extract_path_text(from_varbyte(data, 'utf-8'), 'power') :: INT as power,
  json_extract_path_text(from_varbyte(data, 'utf-8'), 'cadence') :: INT as cadence,
  json_extract_path_text(from_varbyte(data, 'utf-8'), 'heartrate') :: INT as heartrate
FROM
  schema_one.foo;

このビューでは、さらに以下を行っています。

  • ユーザーID(user_id)でデータ分散
  • Kinesisにレコード送信された時刻(approximatearrivaltimestamp)でソート

先程と同じく、ビューをリフレッシュし、レコードを確認します。

REFRESH MATERIALIZED VIEW view_foo_extract

select *
from view_foo_extract
limit 5
approximatearrivaltimestamp user_id generated_time power cadence heartrate
2022-05-20 14:50:10 2 2022-05-20 14:50:09 194 93 143
2022-05-20 14:50:11 2 2022-05-20 14:50:10 191 92 141
2022-05-20 14:50:11 5 2022-05-20 14:50:11 227 97 150
2022-05-20 14:50:13 2 2022-05-20 14:50:12 191 92 141
2022-05-20 14:50:13 5 2022-05-20 14:50:13 221 94 146

過去3分のレコードを対象に ユーザx分単位でデータをサマってみましょう。

select
  user_id,
  to_timestamp(generated_time, 'YYYY-MM-DD HH24:MI') as minute,
  avg(power)
from
  view_foo_extract
where
  approximatearrivaltimestamp > current_timestamp - interval '3 minutes'
group by
  user_id,
  to_timestamp(generated_time, 'YYYY-MM-DD HH24:MI')
order by
  1,
  2
user_id minute avg
1 2022-05-20 17:20:00+00 461
1 2022-05-20 17:21:00+00 371
1 2022-05-20 17:22:00+00 424
2 2022-05-20 17:20:00+00 110
2 2022-05-20 17:21:00+00 98
2 2022-05-20 17:22:00+00 64
5 2022-05-20 17:20:00+00 257
5 2022-05-20 17:21:00+00 241
5 2022-05-20 17:22:00+00 260

8. ニアリアルタイム処理の確認

ニアリアルタイムに処理できていることを確認するために、マテリアライズド・ビューをリフレッシュ後、過去3分を対象に、Kinesisにレコード送信された時刻(approximatearrivaltimestamp)の最小、最大値を確認します。

SELECT current_timestamp,
       min(approximatearrivaltimestamp),
       max(approximatearrivaltimestamp)
FROM view_foo
WHERE approximatearrivaltimestamp > current_timestamp - interval '3 minutes'
timestamptz min max
2022-05-20 17:16:58.891641+00 2022-05-20 17:13:59 2022-05-20 17:16:49
  • 現在時刻 17:16:58.891641 に対して、最新のレコードは9秒前の 17:16:49
  • 3分前は 17:13:58.891641 に対して、3分以内の最古のレコードはほぼ同時刻の 17:13:59

とニアリアルタイムに処理されています。

ストリームデータはビューをリフレッシュしないと更新されないため、クエリーのスケジュール機能を利用し、マテリアライズド・ビューのリフレッシュを定期実行しましょう。

アーキテクチャー

AWSの機能紹介動画で、アーキテクチャーが紹介されていました。

永続的なデータ向けテーブル(permanent table)とストリームデータ用テーブル(Streaming Table)が別れていますね。

さらに、ストリーム用テーブルには Kinesis Data Streams だけでなく、"Amazon Managed Kafka Service ... others"の文字も見えます。

Redshiftがストリームデータに対応し、その第一弾として Kinesis Data Streams 対応が発表されたとみなせ、今後も続々とストリーム対応が進みそうです。

まとめ

Amazon RedshiftからAmazon Kinesis Data Streamのストリームデータをマテリアライズド・ビュー経由でニアリアルタイムにインジェストする機能がプレビュー提供されています。

データパイプラインの観点からは、Firehose・S3を挟むことで構成が複雑になっていたり、データ処理までのレイテンシーが発生するといった課題が解消されます。

フェデレーテッド・クエリの観点からは、Redshiftが新たにストリームデータにも対応したとみなせます。

本機能はKinesis Data StreamsとシームレスにSQL連携できるため、将来的にはKinesis Data Analyticsが担っていた処理の一部もRedshiftへの移行が進むでしょう。

繰り返しとなりますが、本機能はパブリックプレビューで提供されているため、正式版のリリースまでに機能や仕様は変更される可能性があります。 あくまでも検証目的でご利用ください。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.